package com.atuguigu.flink.Day07;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class Example2 {
    //将数据写入redis
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop104").build();
        
        env
                .addSource(new SensorSource())
                .addSink(new RedisSink<SendsorReading>(conf,new MyRedisSink()));



        env.execute();
    }

    public static class MyRedisSink implements RedisMapper<SendsorReading>{

        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET,"sensor") ;
        }

        @Override
        public String getKeyFromData(SendsorReading sendsorReading) {
            return sendsorReading.id;
        }

        @Override
        public String getValueFromData(SendsorReading sendsorReading) {
            return sendsorReading.temperture +"";
        }
    }
}
